package drds.data_propagate.sink.entry.group;

import drds.data_propagate.entry.EntryType;
import drds.data_propagate.sink.exception.SinkException;
import drds.data_propagate.store.Event;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 相比于{@linkplain TimelineBarrier}，增加了按事务支持，会按照事务进行分库合并处理
 */
public class TransactionTimelineBarrier extends TimelineBarrier {

    //
    public static final int transaction_status_init = 0;
    public static final int transaction_status_transaction = 1;
    public static final int transaction_status_nontransaction = 2;
    private ThreadLocal<Boolean> inTransaction = new ThreadLocal() {

        protected Object initialValue() {
            return false;
        }
    };
    /**
     * <pre>
     * 几种状态：
     * 0：初始状态，允许大家竞争
     * 1: 事务数据处理中
     * 2: 非事务数据处理中
     * </pre>
     */
    private AtomicInteger transactionStatus = new AtomicInteger(0);

    public TransactionTimelineBarrier(int groupSize) {
        super(groupSize);
    }

    public void await(Event event) throws InterruptedException {
        try {
            super.await(event);
        } catch (InterruptedException e) {
            // 出现线程中断，可能是因为关闭或者主备切换
            // 主备切换对应的事务尾会未正常发送，需要强制设置为事务结束，允许其他队列通过
            reset();
            throw e;
        }
    }

    public void await(Event event, long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException {
        try {
            super.await(event, timeout, timeUnit);
        } catch (InterruptedException e) {
            // 出现线程中断，可能是因为关闭或者主备切换
            // 主备切换对应的事务尾会未正常发送，需要强制设置为事务结束，允许其他队列通过
            reset();
            throw e;
        }
    }

    public void interrupt() {
        super.interrupt();
        reset();
    }

    // 重新设置状态
    private void reset() {
        inTransaction.remove();
        transactionStatus.set(0);// 重新置位
    }

    //
    protected boolean isAllowPass(Event event, long executeTime) {
        if (transactionStatus.intValue() == transaction_status_transaction && inTransaction.get()) { // 如果处于事务中，直接允许通过。因为事务头已经做过判断
            return true;
        } else if (transactionStatus.intValue() == transaction_status_transaction && !inTransaction.get()) {
            return false;//瞬间状态改变
        } else if (transactionStatus.intValue() == transaction_status_init) {
            boolean result = super.isAllowPass(event, executeTime);
            if (!result) {
                return false;//next time try
            } else {
                // 可能第一条送过来的数据不为Begin，需要做判断处理，如果非事务，允许直接通过，比如DDL语句
                if (event.getEntryType() == EntryType.transaction_begin) {
                    if (transactionStatus.compareAndSet(transaction_status_init, transaction_status_transaction)) {
                        inTransaction.set(true);//瞬间状态改变
                        return true; // 事务允许通过
                    } else {
                        return false;//next time try
                    }
                } else {
                    if (transactionStatus.compareAndSet(transaction_status_init, transaction_status_nontransaction)) { // 非事务保护中
                        // 当基于zk-cursor启动的时候，拿到的第一个Event是TransactionEnd
                        return true; // DDL/DCL/TransactionEnd允许通过
                    } else {
                        return false;//next time try
                    }
                }
            }
        } else {
            throw new IllegalStateException("else");
        }
    }

    public void clear(Event event) {
        super.clear(event);


        if (transactionStatus.intValue() == transaction_status_transaction) {// 事务中
            if (!(event.getEntryType() == EntryType.transaction_end)) {
                //什么也不做
            } else {
                inTransaction.set(false); // 事务结束并且已经成功写入store，清理标记，进入重新排队判断，允许新的事务进入
                boolean result = transactionStatus.compareAndSet(transaction_status_transaction, transaction_status_init);
                if (result == false) {
                    throw new SinkException("getMinExecuteTime is not correct in transaction");
                }
            }
            //event乱序执行,可能导致事务结束事件在事务体外存在(0->2)的情况
        } else if (transactionStatus.intValue() == transaction_status_nontransaction) {// 非事务中
            boolean result = transactionStatus.compareAndSet(transaction_status_nontransaction, transaction_status_init);
            if (result == false) {
                throw new SinkException("getMinExecuteTime is not correct in non-transaction");
            }
            //transactionStatus.intValue() == transaction_status_transaction
            //1transaction_end,2!transaction_end
        }
    }


}
